{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import os\n",
    "import random\n",
    "import asyncio\n",
    "import pandas as pd\n",
    "from tqdm import tqdm\n",
    "from typing import List\n",
    "from dataclasses import dataclass, field\n",
    "from aiolimiter import AsyncLimiter\n",
    "from langchain_openai import ChatOpenAI\n",
    "from dotenv import load_dotenv\n",
    "\n",
    "load_dotenv()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "@dataclass\n",
    "class AsyncLLMAPI:\n",
    "    \"\"\"\n",
    "    大模型API的调用类\n",
    "    \"\"\"\n",
    "\n",
    "    base_url: str\n",
    "    api_key: str  # 每个API的key不一样\n",
    "    uid: int = 0\n",
    "    cnt: int = 0  # 统计每个API被调用了多少次\n",
    "    model: str = \"gpt-3.5-turbo\"\n",
    "    llm: ChatOpenAI = field(init=False)  # 自动创建的对象，不需要用户传入\n",
    "    num_per_second: int = 6  # 限速每秒调用6次\n",
    "\n",
    "    def __post_init__(self):\n",
    "        # 初始化 llm 对象\n",
    "        self.llm = self.create_llm()\n",
    "        # 创建限速器，每秒最多发出 5 个请求\n",
    "        self.limiter = AsyncLimiter(self.num_per_second, 1)\n",
    "\n",
    "    def create_llm(self):\n",
    "        # 创建 llm 对象\n",
    "        return ChatOpenAI(\n",
    "            model=self.model,\n",
    "            base_url=self.base_url,\n",
    "            api_key=self.api_key,\n",
    "        )\n",
    "\n",
    "    async def __call__(self, text):\n",
    "        # 异步协程 限速\n",
    "        self.cnt += 1\n",
    "        async with self.limiter:\n",
    "            return await self.llm.agenerate([text])\n",
    "\n",
    "    @staticmethod\n",
    "    async def _run_task_with_progress(task, pbar):\n",
    "        \"\"\"包装任务以更新进度条\"\"\"\n",
    "        result = await task\n",
    "        pbar.update(1)\n",
    "        return result\n",
    "\n",
    "    @staticmethod\n",
    "    def async_run(\n",
    "        llms: List[\"AsyncLLMAPI\"],\n",
    "        data: List[str],\n",
    "        keyword: str = \"\",  # 文件导出名\n",
    "        output_dir: str = \"output\",\n",
    "        chunk_size=500,\n",
    "    ):\n",
    "\n",
    "        async def _func(llms, data):\n",
    "            \"\"\"\n",
    "            异步请求处理一小块数据\n",
    "            \"\"\"\n",
    "            results = [llms[i % len(llms)](text) for i, text in enumerate(data)]\n",
    "            with tqdm(total=len(results)) as pbar:\n",
    "                results = await asyncio.gather(\n",
    "                    *[\n",
    "                        AsyncLLMAPI._run_task_with_progress(task, pbar)\n",
    "                        for task in results\n",
    "                    ]\n",
    "                )\n",
    "            return results\n",
    "\n",
    "        idx = 0\n",
    "        all_df = []\n",
    "        while idx < len(data):\n",
    "            file = f\"{idx}_{keyword}.csv\"\n",
    "            file_dir = os.path.join(output_dir, file)\n",
    "\n",
    "            if os.path.exists(file_dir):\n",
    "                print(f\"{file_dir} already exist! Just skip.\")\n",
    "                tmp_df = pd.read_csv(file_dir)\n",
    "            else:\n",
    "                tmp_data = data[idx : idx + chunk_size]\n",
    "\n",
    "                loop = asyncio.get_event_loop()\n",
    "                tmp_result = loop.run_until_complete(_func(llms=llms, data=tmp_data))\n",
    "                tmp_result = [item.generations[0][0].text for item in tmp_result]\n",
    "                tmp_df = pd.DataFrame({\"infer\": tmp_result})\n",
    "\n",
    "                # 如果文件夹不存在，则创建\n",
    "                if not os.path.exists(tmp_folder := os.path.dirname(file_dir)):\n",
    "                    os.makedirs(tmp_folder)\n",
    "\n",
    "                tmp_df.to_csv(file_dir, index=False)\n",
    "\n",
    "            all_df.append(tmp_df)\n",
    "            idx += chunk_size\n",
    "\n",
    "        all_df = pd.concat(all_df)\n",
    "        all_df.to_csv(os.path.join(output_dir, f\"all_{keyword}.csv\"), index=False)\n",
    "        return all_df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "\n",
    "def load_json(file_name):\n",
    "    with open(file_name, \"r\", encoding=\"utf-8\") as f:\n",
    "        return json.load(f)\n",
    "    return data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "data = load_json(\"calculate.json\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "{'instruction': '',\n",
       " 'input': '你是一名擅长数学运算的助手，负责逐步推理并解决四则运算问题。请按照以下步骤进行：\\n\\n    1. 阅读并理解问题。\\n    2. 分步计算，逐步解决问题。\\n    3. 给出最终的结果。\\n    4. 按照 JSON 格式输出结果，包括：\\n    - reason: 详细的推理过程。\\n    - infer: 最终的计算结果。\\n\\n    问题：58 + 15 + 17\\n    请给出分析和结果。',\n",
       " 'output': '90'}"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "data[0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "llm = AsyncLLMAPI(\n",
    "    base_url=\"http://localhost:{}/v1\".format(os.environ.get(\"API_PORT\", 8000)),\n",
    "    api_key=\"{}\".format(os.environ.get(\"API_KEY\", \"0\")),\n",
    "    num_per_second=10,\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "texts = [item[\"input\"] for item in data]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'你是一名擅长数学运算的助手，负责逐步推理并解决四则运算问题。请按照以下步骤进行：\\n\\n    1. 阅读并理解问题。\\n    2. 分步计算，逐步解决问题。\\n    3. 给出最终的结果。\\n    4. 按照 JSON 格式输出结果，包括：\\n    - reason: 详细的推理过程。\\n    - infer: 最终的计算结果。\\n\\n    问题：58 + 15 + 17\\n    请给出分析和结果。'"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "texts[0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "ename": "RuntimeError",
     "evalue": "This event loop is already running",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mRuntimeError\u001b[0m                              Traceback (most recent call last)",
      "Cell \u001b[0;32mIn[9], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mAsyncLLMAPI\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43masync_run\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m      2\u001b[0m \u001b[43m    \u001b[49m\u001b[43m[\u001b[49m\u001b[43mllm\u001b[49m\u001b[43m]\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtexts\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeyword\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43m数学计算\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43moutput_dir\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43moutput\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mchunk_size\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;241;43m100\u001b[39;49m\n\u001b[1;32m      3\u001b[0m \u001b[43m)\u001b[49m\n",
      "Cell \u001b[0;32mIn[2], line 78\u001b[0m, in \u001b[0;36mAsyncLLMAPI.async_run\u001b[0;34m(llms, data, keyword, output_dir, chunk_size)\u001b[0m\n\u001b[1;32m     75\u001b[0m tmp_data \u001b[38;5;241m=\u001b[39m data[idx : idx \u001b[38;5;241m+\u001b[39m chunk_size]\n\u001b[1;32m     77\u001b[0m loop \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mget_event_loop()\n\u001b[0;32m---> 78\u001b[0m tmp_result \u001b[38;5;241m=\u001b[39m \u001b[43mloop\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrun_until_complete\u001b[49m\u001b[43m(\u001b[49m\u001b[43m_func\u001b[49m\u001b[43m(\u001b[49m\u001b[43mllms\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mllms\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdata\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mtmp_data\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m     79\u001b[0m tmp_result \u001b[38;5;241m=\u001b[39m [item\u001b[38;5;241m.\u001b[39mgenerations[\u001b[38;5;241m0\u001b[39m][\u001b[38;5;241m0\u001b[39m]\u001b[38;5;241m.\u001b[39mtext \u001b[38;5;28;01mfor\u001b[39;00m item \u001b[38;5;129;01min\u001b[39;00m tmp_result]\n\u001b[1;32m     80\u001b[0m tmp_df \u001b[38;5;241m=\u001b[39m pd\u001b[38;5;241m.\u001b[39mDataFrame({\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124minfer\u001b[39m\u001b[38;5;124m\"\u001b[39m: tmp_result})\n",
      "File \u001b[0;32m~/anaconda3/envs/llm_beta/lib/python3.11/asyncio/base_events.py:629\u001b[0m, in \u001b[0;36mBaseEventLoop.run_until_complete\u001b[0;34m(self, future)\u001b[0m\n\u001b[1;32m    618\u001b[0m \u001b[38;5;250m\u001b[39m\u001b[38;5;124;03m\"\"\"Run until the Future is done.\u001b[39;00m\n\u001b[1;32m    619\u001b[0m \n\u001b[1;32m    620\u001b[0m \u001b[38;5;124;03mIf the argument is a coroutine, it is wrapped in a Task.\u001b[39;00m\n\u001b[0;32m   (...)\u001b[0m\n\u001b[1;32m    626\u001b[0m \u001b[38;5;124;03mReturn the Future's result, or raise its exception.\u001b[39;00m\n\u001b[1;32m    627\u001b[0m \u001b[38;5;124;03m\"\"\"\u001b[39;00m\n\u001b[1;32m    628\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_check_closed()\n\u001b[0;32m--> 629\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_check_running\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m    631\u001b[0m new_task \u001b[38;5;241m=\u001b[39m \u001b[38;5;129;01mnot\u001b[39;00m futures\u001b[38;5;241m.\u001b[39misfuture(future)\n\u001b[1;32m    632\u001b[0m future \u001b[38;5;241m=\u001b[39m tasks\u001b[38;5;241m.\u001b[39mensure_future(future, loop\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m)\n",
      "File \u001b[0;32m~/anaconda3/envs/llm_beta/lib/python3.11/asyncio/base_events.py:588\u001b[0m, in \u001b[0;36mBaseEventLoop._check_running\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m    586\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m_check_running\u001b[39m(\u001b[38;5;28mself\u001b[39m):\n\u001b[1;32m    587\u001b[0m     \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mis_running():\n\u001b[0;32m--> 588\u001b[0m         \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mThis event loop is already running\u001b[39m\u001b[38;5;124m'\u001b[39m)\n\u001b[1;32m    589\u001b[0m     \u001b[38;5;28;01mif\u001b[39;00m events\u001b[38;5;241m.\u001b[39m_get_running_loop() \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m    590\u001b[0m         \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(\n\u001b[1;32m    591\u001b[0m             \u001b[38;5;124m'\u001b[39m\u001b[38;5;124mCannot run the event loop while another loop is running\u001b[39m\u001b[38;5;124m'\u001b[39m)\n",
      "\u001b[0;31mRuntimeError\u001b[0m: This event loop is already running"
     ]
    }
   ],
   "source": [
    "AsyncLLMAPI.async_run(\n",
    "    [llm], texts, keyword=\"数学计算\", output_dir=\"output\", chunk_size=100\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "llm_beta",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
